/*

其中设备码: box_9SLAD6CH订阅列表
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/das/cmd/#
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/das/state/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICE/license/update
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICE/reboot
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICE/time/update
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICEinfo/config/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICEinfo/monitor/driver_list
DEVICE/BASE/MAGUS/cloud/9SLAD6CHDEVICEinfo/monitor/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/log/log_list
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/log/log_pull
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/log/monitor
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/message/his_alarm/request
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/ota/upgrade
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/point/event/control
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/point/event/his_recall
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/register
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/sync/full_request
DEVICE/BASE/MAGUS/cloud/9SLAD6CH/sync/request

aim:
模拟设备码前缀为 CODE_0 ~ CODE_999

*/

package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"os"
	"strconv"
	"time"
)


var f *os.File
var ready chan struct{} // 准备开始

var f2 mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	parseInt, err := strconv.ParseInt(string(msg.Payload()), 0, 0)
	if err != nil {
		fmt.Println(err)
		return
	}

	s := "纳秒: " +  strconv.FormatInt(time.Now().UnixNano() - parseInt, 10) + "\n"

	fmt.Println(s)

	f.Write([]byte(s))
}

func sub()  {
	staticId := "CODE_"
	for i := 0; i < 800;i++ {
		opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:1883").SetKeepAlive(time.Second * 120)
		opts.SetClientID(staticId + strconv.Itoa(i))

		c := mqtt.NewClient(opts)
		token := c.Connect()
		if token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		code := staticId + strconv.Itoa(i)

		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/das/cmd/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/das/state/request/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/DEVICE/license/update/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/DEVICE/reboot/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/DEVICE/time/update/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/DEVICE/info/config/request/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/DEVIC/Einfo/monitor/driver_list/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/DEVICE/info/monitor/request/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/log/log_list/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/log/log_pull/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/log/monitor/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/message/his_alarm/request/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/ota/upgrade/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/point/event/control/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/point/event/his_recall/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/register/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/sync/full_request/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}
		if token = c.Subscribe("DEVICE/BASE/MAGUS/cloud/" + code + "/sync/request/#", 0, f2); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			return
		}

		time.Sleep(time.Millisecond)
		fmt.Println(i)
	}

	ready <- struct{}{}
}

func pub()  {
	tm := time.Now()
	<-ready
	fmt.Println("时间: ", time.Since(tm).String())

	fmt.Println("开始成产数据")
	opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
	opts.SetClientID("pub_once")

	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	for {
		time.Sleep(time.Second)

		b := []byte(strconv.FormatInt(time.Now().UnixNano(), 10))

		token2 := c.Publish("DEVICE/BASE/MAGUS/cloud/CODE_10/das/cmd/1/2", 0, false, b)
		token2.Wait()
	}
}

func main() {
	f, _ = os.OpenFile("sb.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0660)
	go func() {
		for {
			time.Sleep(time.Second)
			f.Sync()
		}
	}()

	ready = make(chan struct{}, 1)

	go sub()
	go pub()

	select {
	}
}
